3
0
Fork 0
forked from mirrors/nixpkgs
nixpkgs/pkgs/development/python-modules/apache-beam/fix-cython.patch
2022-01-07 18:01:35 +01:00

42 lines
1.4 KiB
Diff

diff --git a/apache_beam/runners/worker/operations.py b/apache_beam/runners/worker/operations.py
index 3464c5750c..5921c72b90 100644
--- a/apache_beam/runners/worker/operations.py
+++ b/apache_beam/runners/worker/operations.py
@@ -69,18 +69,6 @@ if TYPE_CHECKING:
from apache_beam.runners.worker.statesampler import StateSampler
from apache_beam.transforms.userstate import TimerSpec
-# Allow some "pure mode" declarations.
-try:
- import cython
-except ImportError:
-
- class FakeCython(object):
- @staticmethod
- def cast(type, value):
- return value
-
- globals()['cython'] = FakeCython()
-
_globally_windowed_value = GlobalWindows.windowed_value(None)
_global_window_type = type(_globally_windowed_value.windows[0])
@@ -149,7 +137,7 @@ class ConsumerSet(Receiver):
# type: (WindowedValue) -> None
self.update_counters_start(windowed_value)
for consumer in self.consumers:
- cython.cast(Operation, consumer).process(windowed_value)
+ consumer.process(windowed_value)
self.update_counters_finish()
def try_split(self, fraction_of_remainder):
@@ -345,7 +333,7 @@ class Operation(object):
def output(self, windowed_value, output_index=0):
# type: (WindowedValue, int) -> None
- cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
+ self.receivers[output_index].receive(windowed_value)
def add_receiver(self, operation, output_index=0):
# type: (Operation, int) -> None