source_creator
07/19/2019, 9:05 PMRobert Medeiros
07/19/2019, 10:23 PMLucaSoato
07/22/2019, 8:20 AMChris White
Wai Kiat Tan
07/24/2019, 7:03 AM[2019-07-24 06:56:03,439] WARNING - prefect.TaskRunner | Task 'x': can't use cache because it is now invalid
Any idea?orcaman
07/24/2019, 8:44 AMflow.sorted_tasks
function and output that result to a CSV to be able to compare the two projects, but it seems that the function returns different files even for the same program (I guess the internal sorting mechanism doesn't generate the same sort at every run). Any ideas would be appreciated. Thanks!Mathis Bƶrner
07/24/2019, 10:00 AMMathis Bƶrner
07/24/2019, 10:04 AMMathis Bƶrner
07/24/2019, 10:29 AMimport time
import prefect
from prefect import task, Flow, Parameter
from prefect import tags
from prefect.engine import signals
from prefect.environments.storage import LocalStorage
from prefect.environments import LocalEnvironment
from prefect.utilities.tasks import pause_task
from prefect.engine.state import Resume
@task
def hold(cfg):
pause_task()
return cfg
@task
def process(cfg):
try:
return prefect.contex.key + cfg['value']
except:
raise signals.FAIL()
def prepare_flow(flow, cfg, key, name, storage):
with prefect.context(key=key):
flow.run(cfg=cfg)
previous_name, flow.name = flow.name, name
storage_name = storage.add_flow(flow)
flow.name = previous_name
return storage_name
def run_flow(flow_name, storage):
flow_test = storage.get_flow(flow_name)
resume_states = {t: Resume()
for t in flow_test.get_tasks(tags='holdpoint')}
flow_test.run(task_states=resume_states)
if __name__ == '__main__':
env = LocalEnvironment()
storage = LocalStorage('storage')
with Flow('Using Context', environment=env) as flow:
cfg = Parameter('cfg')
with tags('holdpoint'):
cfg = hold(cfg)
process(cfg)
cfg = {'value': 'def'}
key = 'abc'
flow_name = prepare_flow(flow, cfg, key, 'test_flow', storage)
time.sleep(5)
run_flow(flow_name, storage)
Mathis Bƶrner
07/24/2019, 10:29 AMBrian McFeeley
07/24/2019, 8:50 PM@task
def do_something(x, y):
# X is the same for each invocation
# Y is the result we're mapping over
I tried something like foo.map("two", "parameters")
, but when I print from inside these functions, only the second param contains what I expectJoe Schmid
07/25/2019, 3:01 PMresult_handler
and having write()
& read()
functions called? (Mainly interested in result_handler
on a Task, but Flow-level or overall default is fine, too.) I can't seem to get my ResultHandler called and I'm sure it's something silly I'm doing. (I can post a simple example if it helps, but I suspect others have this working just fine.)Brian McFeeley
07/25/2019, 3:30 PM@task
def extract():
return [1, 2, 3]
@task
def transform1(x):
return [x, x * 2, x * 3]
@task
def load(x):
print(f"output: {x}")
if __name__ == '__main__':
with Flow("mapping test") as flow:
e = extract()
t1 = transform1.map(e)
l = load(t1)
flow.run()
This works like we expect and generates a new list for each of the items in the original list, with output [[1, 2, 3], [2, 4, 6], [3, 6, 9]]
.
However, let's say I wanted to then map over each of the items in each of these sublists. If I add a new task:
@task
def transform2(x):
return x * 2
And then change my flow to:
e = extract()
t1 = transform1.map(e)
t2 = transform2.map(t1)
l = load(t2)
I see that the second tasks received the lists as a whole in their mapped execution, rather than mapping over the internal scalars, so the output is [[1, 2, 3, 1, 2, 3], [2, 4, 6, 2, 4, 6], [3, 6, 9, 3, 6, 9]]
i.e. we duplicated the list instead of multiplying each element by 2.
An example from real life here is maybe:
⢠generate or grab list of s3 buckets
⢠get list of files in each bucket
⢠process each file individually
Any ideas? My docs search is coming up dry.Brian McFeeley
07/25/2019, 5:43 PMChris Hart
07/26/2019, 1:44 AMChris Hart
07/26/2019, 1:45 AMChris
07/26/2019, 7:10 AMthere appear to be 1 leaked semaphores to clean up at shutdown
. Also, rather than killing the flow, or restarting the failed task, it restarts upstream tasks which were successfully run. Does anyone know how what the cause of failure might be and whether itās possible to only restart the failed task?paularmand
07/26/2019, 1:42 PMpaularmand
07/26/2019, 1:42 PMChris Hart
07/26/2019, 5:14 PMChris Hart
07/26/2019, 5:16 PMhas_next_page: true/false
that could be used for optionally triggering the task again by passing in the limit/offset/cursor etc.. are there any tutorials or examples of mapping with loops?Chris Hart
07/26/2019, 5:24 PMChris Hart
07/26/2019, 6:46 PMChris Hart
07/29/2019, 4:26 PMChris White
Brian McFeeley
07/29/2019, 7:02 PMUnexpected error occured in FlowRunner: ModuleNotFoundError("No module named 'utils'")
, referencing shared static functions in a package called utils
. Any ideas how I might go about debugging this to make sure that code is available at the time of execution? When I look at the barf from the dask workers themselves, it looks like it's crapping out trying to deserialize a task:
distributed.worker - WARNING - Could not deserialize task
Traceback (most recent call last):
File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/worker.py", line 1272, in add_task
self.tasks[key] = _deserialize(function, args, kwargs, task)
File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/worker.py", line 3060, in _deserialize
function = pickle.loads(function)
File "/Users/bmcfeeley/.virtualenvs/spark3.7/lib/python3.7/site-packages/distributed/protocol/pickle.py", line 61, in loads
return pickle.loads(x)
ModuleNotFoundError: No module named 'utils'
Does all the code have to live in the same file as the flow/task definitions somehow?Brian McFeeley
07/29/2019, 9:45 PMdistributed.client - WARNING - Couldn't gather 4 keys, rescheduling {'extract-2e6b9db4-5567-486a-af2f-8ad07bc6a77b': ('<tcp://172.17.0.2:43849>',), 'transform2-b5e3d1d0-fb14-44f6-afe6-6b47ec5ab277': ('<tcp://172.17.0.2:43849>',), 'transform1-8156dfd1-1a53-4213-bf66-c70d1aab724f': ('<tcp://172.17.0.2:43849>',), 'load-fa2ae950-213e-4044-9773-fdacd7b057b4': ('<tcp://172.17.0.2:43849>',)}
I'm trying to deploy my cluster in aptible, and I suspect i've not set up enough plumbing on exposing the right ports so the scheduler and worker processes can communicate. Does this ring any bells?Brett Papineau
07/29/2019, 10:29 PMJie Lou
07/30/2019, 4:19 PMChris Hart
07/30/2019, 10:16 PM